#!/usr/bin/env bash
# This script is used to run all of the examples. Mostly to be used by travis for testing
# Output the commands we run
set -x
# If any command fails, fail
set -e
# Build everything
./sbt/sbt compile package assembly > sbtlog || (echo "sbt failed" && cat ./sbtlog && exit 1)
KAFKA_ROOT=./kafka_2.9.2-0.8.1.1
SPARK_SUBMIT_SCRIPT=$SPARK_HOME/bin/spark-submit
ASSEMBLY_JAR=./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar
# Mini cleanup
rm -rf /tmp/py; mkdir -p /tmp/py
rm -rf /tmp/java; mkdir -p /tmp/java
rm -rf /tmp/scala; mkdir -p /tmp/scala
# setup cassandra
# cqlsh --file ./files/cqlsh_setup &
# Scala
echo "Running Scala programs"
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.scala.LoadJsonWithSparkSQL $ASSEMBLY_JAR local ./files/pandainfo.json 
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.scala.ChapterSixExample  $ASSEMBLY_JAR local ./files/callsigns ./files/callsigns /tmp/scala/ch6out
TWITTER_DATA=./files/testweet.json
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.scala.SparkSQLTwitter  $ASSEMBLY_JAR  "$TWITTER_DATA"  /tmp/scala/tweetout
#$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.scala.BasicQueryCassandra $ASSEMBLY_JAR local localhost
echo "Running Scala streaming program"
./bin/fakelogs.sh &
sleep 1
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.scala.StreamingLogInput $ASSEMBLY_JAR local[4]
echo "Running Scala Kafka streaming example"
$SPARK_SUBMIT_SCRIPT --master local[4] --class com.oreilly.learningsparkexamples.scala.KafkaInput $ASSEMBLY_JAR localhost:2181 spark-readers pandas 1 &
KAFKA_PID=$!
sleep 1
echo "panda\nerror panda" | $KAFKA_ROOT/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pandas
wait $KAFKA_PID
echo "Running Scala Flume example"
$SPARK_SUBMIT_SCRIPT --master local[4] --class com.oreilly.learningsparkexamples.scala.FlumeInput $ASSEMBLY_JAR localhost 7788 &
FLUME_PID=$!
sleep 1
echo "panda\nerror panda\n" | nc localhost 44444
sleep 3
echo "panda2\nerror panda2\n" | nc localhost 44444
wait $FLUME_PID
# Python
echo "Running Python programs"
$SPARK_SUBMIT_SCRIPT ./src/python/AvgMapPartitions.py local
$SPARK_SUBMIT_SCRIPT ./src/python/BasicAvg.py local
$SPARK_SUBMIT_SCRIPT ./src/python/BasicFilterMap.py local
$SPARK_SUBMIT_SCRIPT ./src/python/BasicKeyValueMapFilter.py local
$SPARK_SUBMIT_SCRIPT ./src/python/BasicMapPartitions.py local
$SPARK_SUBMIT_SCRIPT ./src/python/BasicMap.py local
$SPARK_SUBMIT_SCRIPT ./src/python/ChapterSixExample.py local ./files/callsigns /tmp/py/pandaout
$SPARK_SUBMIT_SCRIPT ./src/python/SparkSQLTwitter.py ./files/testweet.json /tmp/py/tweetout
$SPARK_SUBMIT_SCRIPT ./src/python/LoadCsv.py local ./files/favourite_animals.csv /tmp/py/panda_lovers.csv
$SPARK_SUBMIT_SCRIPT ./src/python/MakeHiveTable.py local ./files/int_string.csv pandaplural
# Temporarily disabled due to API changes
#$SPARK_SUBMIT_SCRIPT ./src/python/LoadHive.py local pandaplural
$SPARK_SUBMIT_SCRIPT ./src/python/LoadJson.py local ./files/pandainfo.json /tmp/py/loadjsonout
$SPARK_SUBMIT_SCRIPT ./src/python/PerKeyAvg.py local
$SPARK_SUBMIT_SCRIPT ./src/python/RemoveOutliers.py local
$SPARK_SUBMIT_SCRIPT ./src/python/WordCount.py local
$SPARK_SUBMIT_SCRIPT ./src/python/MakeParquetFile.py local ./files/favourite_animals.csv /tmp/py/favouriteanimal_parquet
$SPARK_SUBMIT_SCRIPT ./src/python/QueryParquetFile.py local /tmp/py/favouriteanimal_parquet

# Java
echo "Running Java programs"
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.LoadJsonWithSparkSQL $ASSEMBLY_JAR local ./files/pandainfo.json
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.ChapterSixExample $ASSEMBLY_JAR local ./files/callsigns ./files/callsigns /tmp/java/ch6out
./sbt/sbt assembly && $SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.SparkSQLTwitter  $ASSEMBLY_JAR  ./files/testweet.json  /tmp/java/tweetout
#$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.BasicQueryCassandra $ASSEMBLY_JAR local localhost
echo "Running Java streaming program"
./bin/fakelogs.sh &
sleep 1
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.StreamingLogInput $ASSEMBLY_JAR local[4]
sleep 5
echo "Running Java Kafka streaming example"
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.KafkaInput $ASSEMBLY_JAR localhost:2181 spark-java-readers
echo "panda\nerror panda" | $KAFKA_ROOT/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pandas

echo "Done running all programs :)"
